/**
 * 
 */
package io.potato.ts.service;

import io.potato.core.util.*;
import io.potato.ts.common.Constants;
import io.potato.ts.common.SmsIdWorker;
import io.potato.ts.common.model.SmsInfo;
import io.potato.ts.domain.SmsSendedRecord;
import io.potato.ts.domain.SmsSendingRecord;
import io.potato.ts.sms.SmsApiStatus;
import io.potato.ts.sms.SmsContent;
import io.potato.ts.sms.SmsResponse.SmsID;
import io.potato.ts.sms.SmsSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.core.namedparam.BeanPropertySqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
   *   发 短信服务
 * @author timl
 * created: 2019年1月8日 上午10:00:49
 */
@Service
@Slf4j
public class SmsService {
	
	@Autowired
	private NamedParameterJdbcTemplate namedJdbcTemplate;
	
	@Autowired
	private SmsSender smsSender;
	
	@Autowired
	private CodeSegService codeSegService;
	
	@Autowired
	private UserService userService;
	
	@Autowired
	private SmsSendingRecordService ssr;
	
	@Autowired
	private StringRedisTemplate redisTemplate;
	
	/**
	 * 每次发短信最大数量
	 */
	private int batchSize = 100;
	
	/**
	 * 每批次插入数据最大数量
	 */
	private int sqlBatchSize = 2000;
	
	@Value("${sms.default-sign}")
	private String defaultSign;
	
	static final String KEY_PREFIX_MSGID = "sms_msgid:";
	
	static final String KEY_PREFIX_HWMSGID = "sms_hwmsgid:";
	
	/**
	 * 号段运营商映射
	 */
	private Map<String, Integer> dealerMap;
	
	/**
	 * ID生成器
	 */
	@Autowired
	private SmsIdWorker idWorker;
	
	/**
	 * 发送短信的线程池
	 */
	ExecutorService sendSmsExecutor = Executors.newFixedThreadPool(150);
	
	@PostConstruct
	public void init() {
		log.debug("init SmsService");
		this.dealerMap = this.codeSegService.findMap();
	}
	
	/**
	 * 提交批量短信
	 * @param sendingList  要发送的短信记录
	 */
	public void submitBatchSms(String userCode, List<SmsSendingRecord> sendingList) {
		if (sendingList == null || sendingList.isEmpty()) {
			return;
		}
		
		this.batchSaveSendingRecord(userCode, sendingList);
	}
	
	/**
	 * 发送定时短信
	 * @param toSendList
	 */
	public void sendSms(List<SmsSendingRecord> toSendList, Integer senderType) {
		if (toSendList == null || toSendList.isEmpty()) {
			return;
		}
		
		// 按用户分组, 相同用户的放一起发送
		Map<String, List<SmsInfo>> mapByUser = new HashMap<>();
		toSendList.forEach(s -> {
			
			String key = s.getExtendCode();
			SmsInfo smsInfo = new SmsInfo(s.getId(), s.getMsgId(), s.getUserCode(), 
					s.getBatchCode(), s.getExtendCode(), s.getDestNum(), s.getContent(), 
					s.getSendTime(), s.getNeedReport(), s.getTaskName(), s.getSmsType());
			List<SmsInfo> listForUser = mapByUser.get(key);
			if (listForUser == null) {
				listForUser = new ArrayList<SmsInfo>();
				listForUser.add(smsInfo);
				mapByUser.put(key, listForUser);
			} else {
				listForUser.add(smsInfo);
			}
		});
		
		// 按用户发送, 使用多线程发送
		List<CompletableFuture<?>> results = new ArrayList<>();
		for (String extendCode : mapByUser.keySet()) {
			List<SmsInfo> smsList = mapByUser.get(extendCode);
			
			// 按短信内容排序
			Collections.sort(smsList, Comparator.comparing(SmsInfo::getContent));
			if (smsList.size() < batchSize) {
				CompletableFuture<?> r = CompletableFuture.runAsync(() -> {
					sendSmsByExtendCode(smsList.get(0).getUserCode(), extendCode, smsList, senderType);
				}, this.sendSmsExecutor);
				results.add(r);
			} else {
				PagedList<SmsInfo> pagedList = new PagedList<>(smsList, batchSize);
				for (int i = 0; i < pagedList.getTotalPage(); i++) {
					final List<SmsInfo> subList = pagedList.getPage(i);
					CompletableFuture<?> r = CompletableFuture.runAsync(() -> {
						sendSmsByExtendCode(subList.get(0).getUserCode(), extendCode, subList, senderType);
					}, this.sendSmsExecutor);
					results.add(r);
				}
			}
			
			/*CompletableFuture<?> r = CompletableFuture.runAsync(() -> {
				doSendSms(extendCode, smsList, senderType);
			}, this.sendSmsExecutor);*/
			
		}
		// 等待所有线程处理完成
		CompletableFuture<?>[] cfs = new CompletableFuture[results.size()];	
		CompletableFuture.allOf(results.toArray(cfs)).join();	
	}
	
	private void doSendSms(String userCode, List<SmsInfo> smsList, Integer senderType) {
		try {
			// 根据拓展码排序
			//Collections.sort(smsList, Comparator.comparing(SmsInfo::getExtendCode));
			
			// 发送短信
			sendSmsByUser(userCode, smsList, senderType);
		} catch (Throwable e) {
			log.error("send sms error:", e);
		}
	}
	
	/**
	 * 立即发送短信
	 * @param list
	 */
	public void sendSmsByUser(String userCode, List<SmsInfo> list, Integer senderType) {
		
		if (list == null || list.isEmpty()) {
			return;
		}

		// 按拓展码分组
		Map<String, List<SmsInfo>> mapByExtendCode = new HashMap<>();
		
		list.forEach(s -> {
			List<SmsInfo> listForExtendCode = mapByExtendCode.get(s.getExtendCode());
			if (listForExtendCode == null) {
				listForExtendCode = new ArrayList<SmsInfo>();
				listForExtendCode.add(s);
				mapByExtendCode.put(s.getExtendCode(), listForExtendCode);
			} else {
				listForExtendCode.add(s);
			}
		});
		
		// 按拓展码发送
		mapByExtendCode.forEach((extendCode, smsList) -> {
			// 按短信内容排序
			Collections.sort(smsList, Comparator.comparing(SmsInfo::getContent));
			if (smsList.size() < batchSize) {
				sendSmsByExtendCode(userCode, extendCode, smsList, senderType);
			} else {
				PagedList<SmsInfo> pagedList = new PagedList<>(smsList, batchSize);
				for (int i = 0; i < pagedList.getTotalPage(); i++) {
					sendSmsByExtendCode(userCode, extendCode, pagedList.getPage(i), senderType);
				}
			}
			
		});
		
	}
	
	/**
	 * 按拓展码发送短信
	 * @param userCode
	 * @param extendCode
	 * @param smsList
	 * @param senderType
	 */
	private void sendSmsByExtendCode(String userCode, String extendCode, List<SmsInfo> smsList, Integer senderType) {
		String batchCode = smsList.get(0).getBatchCode();
		String extendStr = buildExtend(senderType, userCode, batchCode, extendCode, needReport(smsList));
		LocalDateTime sendTime = LocalDateTime.now();
		List<SmsID> result;
		
		List<Long> ids = new ArrayList<>();
		for (SmsInfo s : smsList) {
			ids.add(s.getId());
		}
		
		// 更新发送次数
		ssr.updateTryTimes(userCode, ids);
		
		// 发送短信
		try {
			result = sendDiffSms(userCode, extendCode, smsList, extendStr);
		} catch (Throwable e) {
			// 调用接口失败，发送短信失败
			log.error("send_sms_failed", e);
			return;
		}
		
		// 保存发送记录到数据库
		// 是否需要异步？
		try {
			saveSendedRecord(userCode, smsList, result, sendTime, senderType);
		} catch (Throwable e) {
			// 保存失败也要继续走下去
			log.error("saveSendedRecord error", e);
		}
		
		// 处理返回状态码为  E200015：待发送短信数量太大 的记录
		if (removeOverflow(smsList, result)) {
			ids = new ArrayList<>();
			for (SmsInfo s : smsList) {
				ids.add(s.getId());
			}
		}
		
		// 修改状态
		if (Constants.SENDER_TYPE_API.equals(senderType)) {
			// API发送的都删掉， 不会再查询了
			//ssr.deleteBatch(ids);
			ssr.updateStatus(userCode, ids, Constants.SendingStatus.SENDED);
		} else {
			// web用户的先保留
			ssr.updateStatus(userCode, ids, Constants.SendingStatus.SENDED);
		}
	}
	
	private boolean removeOverflow(List<SmsInfo> list, List<SmsID> resultList) {
		boolean hasOverflow = false;
		
		for (int i = resultList.size() - 1; i >= 0; i--) {
			if (SmsApiStatus.isOverflow(resultList.get(i).getStatus())) {
				hasOverflow = true;
				list.remove(i);
			}
		}
		
		return hasOverflow;
	}
	
	/**
	 * 保存系统生成的msgId和华为返回的msgId的映射关系
	 * @param msgId  系统生成的msgID
	 * @param hwMsgId 华为的msgID
	 */
	public void saveMsgIdMapping(String msgId, String hwMsgId) {
		try {
			String key = KEY_PREFIX_MSGID + msgId;
			redisTemplate.opsForValue().set(key, hwMsgId, 24, TimeUnit.HOURS);
			
			String key2 = KEY_PREFIX_HWMSGID + hwMsgId;
			redisTemplate.opsForValue().set(key2, msgId, 24, TimeUnit.HOURS);
		} catch (Exception e) {
			log.error("save redis error: " + e.getMessage());
		}
	}
	
	/**
	 * 返回系统生成的MSgId对应的华为的MsgID
	 * @param msgId  系统生成的MSGID
	 * @return  华为的MSGID
	 */
	public String getHwMsgId(String msgId) {
		String key = KEY_PREFIX_MSGID + msgId;
		return this.redisTemplate.opsForValue().get(key);
	}
	
	public String getMsgId(String hwMsgId) {
		String key = KEY_PREFIX_HWMSGID + hwMsgId;
		return this.redisTemplate.opsForValue().get(key);
	}

	
	public List<SmsID> sendDiffSms(String userCode, String extendCode, List<SmsInfo> list, String extendStr) {
		
		if (list.size() <= batchSize) {
			return this.doSendDiffSms(userCode, extendCode, list, extendStr);
		} else {
			List<SmsID> resultList = new ArrayList<>();
			PagedList<SmsInfo> pagedList = new PagedList<>(list, batchSize);
			for (int i = 0; i < pagedList.getTotalPage(); i++) {
				List<SmsID> result = this.doSendDiffSms(userCode, extendCode, pagedList.getPage(i), extendStr);
				resultList.addAll(result);
			}
			return resultList;
		}
	}
	
	private List<SmsID> doSendDiffSms(String userCode, String extendCode, List<SmsInfo> list, String extendStr) {
		log.debug("do send sms " + list.size());

		List<SmsContent> smsList = this.getSmsContentList(list);
		List<SmsID> sendResult = this.smsSender.batchSendDiffSms(smsList, extendCode, extendStr, getSign(userCode));
		return sendResult;
	}

    /**
     * 返回用户的签名
     * @param userCode
     * @return
     */
	private String getSign(String userCode) {
	    if (SmsUtil.isWebUser(userCode)) {
	        return defaultSign;
        }

        String sign = userService.findSignByCode(userCode);
        if (StringUtils.isBlank(sign) || sign.length() > 12) {
            // 如果没有设置 name就用默认的
            return defaultSign;
        } else {
            return sign;
        }
	}
	
	private List<SmsContent> getSmsContentList(List<SmsInfo> list) {
		List<SmsContent> smsList = new ArrayList<>();
		List<String> mobiles = null;
		String lastContent = null;
		
		for (SmsInfo smsInfo : list) {
			
			if (smsInfo.getContent().equals(lastContent)) {
				  mobiles.add(smsInfo.getMobile());
			} else {
				if (mobiles != null && mobiles.size() > 0) {
					smsList.add(new SmsContent(mobiles, lastContent));
				}
				mobiles = new ArrayList<>();
				mobiles.add(smsInfo.getMobile());
				lastContent = smsInfo.getContent();
			}
		}
		
		if (mobiles != null && mobiles.size() > 0) {
			smsList.add(new SmsContent(mobiles, lastContent));
		}
		return smsList;
	}
	
	/**
	 * 判断是否需要短信报告
	 * @param list
	 * @return
	 */
	private Integer needReport(List<SmsInfo> list) {
		// 只要有一条短信需要短信报告，全部需要
		for (SmsInfo smsInfo : list) {
			if (smsInfo.getNeedReport() > 0) {
				return 1;
			}
		}
		return 0;
	}
	
	/**
	 * 扩展参数
	 * @param senderType  发送者类型：1 API  2 WEB
	 * @param extendCode  拓展码， 最大7位
	 * @return
	 */
	private String buildExtend(Integer senderType, String userCode, String batchCode, String extendCode, Integer needReport) {
		if (senderType == null) {
			senderType = 0;
		}
		return senderType + "_" + userCode + "_" + batchCode + "_" + extendCode  + "_" + needReport;
	}
	
	/**
	 * 发送内容是否一样
	 * @param list
	 * @return
	 */
	@SuppressWarnings("unused")
	private boolean isSameContent(List<SmsInfo> list) {
		
		if (list.size() == 1) {
			return true;
		}
		
		Set<String> set = new HashSet<>();
		list.forEach(s -> set.add(s.getContent()));
		return set.size() == 1;
	}

	
	/**
	 * 批量插入待发送记录
	 * @param sendingList  待发送列表
	 */
	private void batchSaveSendingRecord(String userCode, List<SmsSendingRecord> sendingList) {
		String table = TableUtil.getSendingRecordTable(userCode);
		String sql = "INSERT INTO " + table
				+ " (user_code, batch_code, extend_code, dest_num, sender_type, sms_type, status, task_name, msg_id, content, send_time, lot, need_report, seq, try_times) "
				+ " VALUES "
				+ " (:userCode, :batchCode, :extendCode, :destNum, :senderType, :smsType, :status, :taskName, :msgId, :content, :sendTime, :lot, :needReport, :seq, :tryTimes)";
		
		this.batchUpdate(sql, sendingList);
	}
	
	private void saveSendedRecord(String userCode, List<SmsInfo> list, List<SmsID> resultList, LocalDateTime sendTime, Integer senderType) {
		
		LocalDateTime now = LocalDateTime.now();
		long sendDur = LdtUtil.toMilliSeconds(now) - LdtUtil.toMilliSeconds(sendTime);
		
		HashMap<String, Integer> sendedMap = new HashMap<String, Integer>();
		List<SmsSendedRecord> sendedList = new ArrayList<>();
		
		String lot = Strings.uuid();
		Integer userId = tryGetUserId(userCode);
		
		for (int i = 0; i < list.size(); i++) {
			SmsInfo s = list.get(i);
			SmsID smsID =  resultList.get(i);
			boolean zeroSend = false;
			
			if (Constants.NeedReport.YES.equals(s.getNeedReport()) && smsID.getSmsMsgId() != null) {
				// 需要状态报告
				// 保存系统生成msgID和华为MsgId的映射关系，将来用来接口查询状态报告
				saveMsgIdMapping(s.getMsgId(), smsID.getSmsMsgId());
			} 
			
			String key = s.getExtendCode() + "_" + s.getMobile() + "_" + s.getContent();
			if (sendedMap.containsKey(key)) {
				// 相同号码相同内容只会发送一条
				//continue;  TODO: 暂时全部入库
				zeroSend = true;
			} else {
				sendedMap.put(key, Integer.MAX_VALUE);
			}
			
			SmsSendedRecord rec = new SmsSendedRecord();
			rec.setId(nextId());
			rec.setContent(s.getContent());
			rec.setTotal(zeroSend ? 0 : SmsUtil.getSmsCount(defaultSign, s.getContent()));  //计算分拆成多少条短信来发送
			rec.setDealerId(this.getDealerId(dealerMap, s.getMobile()));
			rec.setDestNum(s.getMobile());
			
			rec.setLot(lot);
			
			rec.setSenderType(senderType);
			rec.setSendStatus(SmsApiStatus.getSendStatus(smsID.getStatus()));  // 发送状态
			if (Constants.SENDER_TYPE_WEB.equals(senderType)) {
				rec.setReceiveStatus(-1);  // 正在发送
			} else {
				rec.setReceiveStatus(0);  // 默认为发送成功， 如果接收到的状态报告为失败 才更新为失败
			}
			
			rec.setUserId(userId);
			rec.setUserCode(userCode);
			rec.setBatchCode(s.getBatchCode());
			rec.setExtendCode(s.getExtendCode());
			
			rec.setSubmitTime(s.getSubmitTime());  // 提交发送时间
			rec.setSendTime(sendTime);  // 实际发送时间
			rec.setSendDur(sendDur);  // 从提交发送到实际发送完花的时间
			rec.setSendDelay(LdtUtil.toMilliSeconds(sendTime) - LdtUtil.toMilliSeconds(s.getSubmitTime()));  // 从提交发送到实际开始发送的时间
			
			rec.setTaskName(s.getTaskName());
			rec.setMsgId(s.getMsgId());
			rec.setHwMsgId(smsID.getSmsMsgId());
			
			rec.setSmsType(s.getSmsType());
			
			sendedList.add(rec);
		};
		long start = System.currentTimeMillis();
		this.batchSaveSendedRecord(userCode, sendedList);
		long dur = System.currentTimeMillis() - start;
		log.debug("save " + sendedList.size() + " records in " + dur + " ms " );
	}
	
	private Integer tryGetUserId(String userCode) {
		try {
			return userService.findIdByCode(userCode);
		} catch (Exception e) {
			log.error("get userid error: " + e.getMessage());
			return 0;
		}
	}
	
	/**
	 * 返回号码所属的运营商
	 * @param dealerMap 号段运营商映射关系Map
	 * @param num 手机号码
	 * @return  运营商ID
	 */
	private Integer getDealerId(Map<String, Integer> dealerMap, String num) {
		// 号段配置 有三位，四位， 五位
		for (int i = 3; i <= 5; i ++) {
			String seg = num.substring(0, i);
			if (dealerMap.containsKey(seg)) {
				return dealerMap.get(seg);
			}
		}
		// 如果找不到就默认中国移动
		return Constants.Dealer.CHINA_MOBILE;
	}
	
	/**
	 * 生成唯一ID
	 * @return
	 */
	private Long nextId() {
		return idWorker.nextId();
	}
	
	/**
	 * 批量保存已发送记录
	 * @param sendedList 已发送列表
	 */
	private void batchSaveSendedRecord(String userCode, List<SmsSendedRecord> sendedList) {
		LocalDateTime sendTime = sendedList.get(0).getSendTime();
		String tableName = TableUtil.getSendedRecordTable(userCode, sendTime);
		String sql = "INSERT INTO " + tableName
				+ " (id, user_id, user_code, batch_code, extend_code, dest_num, sender_type, sms_type, send_status, task_name, content, total, submit_time, send_time, send_delay, send_dur, lot, dealer_id, msg_id, hw_msg_id, receive_status) "
				+ " VALUES "
				+ " (:id, :userId, :userCode, :batchCode, :extendCode, :destNum, :senderType, :smsType, :sendStatus, :taskName, :content, :total, :submitTime, :sendTime, :sendDelay, :sendDur, :lot, :dealerId, :msgId, :hwMsgId, :receiveStatus)";
		
		this.batchUpdate(sql, sendedList);
	}
	
	private void batchUpdate(String sql, List<?> list) {
		if (list.size() <= sqlBatchSize) {
			this.doBatchUpdate(sql, list);
		} else {
			// 数据太多， 需要分批执行
			PagedList<?> pagedList = new PagedList<>(list, sqlBatchSize);
			for (int i = 0; i < pagedList.getTotalPage(); i++) {
				this.doBatchUpdate(sql, pagedList.getPage(i));
			}
		}
	}
	
	private void doBatchUpdate(String sql, List<?> list) {
		BeanPropertySqlParameterSource[] batchParam = new BeanPropertySqlParameterSource[list.size()];
		for (int i = 0; i < batchParam.length; i++) {
			batchParam[i] = new BeanPropertySqlParameterSource(list.get(i));
		}
		
		namedJdbcTemplate.batchUpdate(sql, batchParam);
	}
	
}
